#ifndef WIN32
#include <netdb.h>
#include <unistd.h>
#include <cstdlib>
#include <cstring>
#else
#include <winsock2.h>
#include <ws2tcpip.h>
#include <wspiapi.h>
#endif
#include <udt.h>
#include <atomic>
#include <future>
#include <iostream>
#include <thread>
#include "cc.h"
#include "test_util.h"

using namespace std;

#ifndef WIN32
void* recvdata(const UDTSOCKET);
void handleReadFds(const std::set<UDTSOCKET>& readfds,
                   const UDTSOCKET& listen_sock_);
#else
DWORD WINAPI recvdata(LPVOID);
#endif

static int udt_eid_ = 0;
static int read_event = UDT_EPOLL_IN | UDT_EPOLL_ERR;
static std::atomic_ullong tsize;

int main(int argc, char* argv[]) {
  if ((1 != argc) && ((2 != argc) || (0 == atoi(argv[1])))) {
    cout << "usage: appserver [server_port]" << endl;
    return 0;
  }

  // Automatically start up and clean up UDT module.
  UDTUpDown _udt_;

  addrinfo hints;
  addrinfo* res;

  memset(&hints, 0, sizeof(struct addrinfo));

  hints.ai_flags = AI_PASSIVE;
  hints.ai_family = AF_INET;
  hints.ai_socktype = SOCK_STREAM;
  // hints.ai_socktype = SOCK_DGRAM;

  string service("9000");
  if (2 == argc) service = argv[1];

  if (0 != getaddrinfo(NULL, service.c_str(), &hints, &res)) {
    cout << "illegal port number or port is busy.\n" << endl;
    return 0;
  }

  UDTSOCKET serv =
      UDT::socket(res->ai_family, res->ai_socktype, res->ai_protocol);

  // UDT Options
//   UDT::setsockopt(serv, 0, UDT_CC, new CCCFactory<CUDPBlast>,
//                   sizeof(CCCFactory<CUDPBlast>));
//   UDT::setsockopt(serv, 0, UDT_MSS, new int(9000), sizeof(int));
  UDT::setsockopt(serv, 0, UDT_RCVBUF, new int(10000000), sizeof(int));
  UDT::setsockopt(serv, 0, UDP_RCVBUF, new int(10000000), sizeof(int));

  if (UDT::ERROR == UDT::bind(serv, res->ai_addr, res->ai_addrlen)) {
    cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;
    return 0;
  }

  freeaddrinfo(res);

  cout << "server is ready at port: " << service << endl;

  if (UDT::ERROR == UDT::listen(serv, 10)) {
    cout << "listen: " << UDT::getlasterror().getErrorMessage() << endl;
    return 0;
  }

  sockaddr_storage clientaddr;
  int addrlen = sizeof(clientaddr);

  auto ft = std::async(std::launch::async, [&]() {
    while (true) {
      tsize = 0;
      auto last_tp = std::chrono::steady_clock::now();
      sleep(1);
      auto cur_tp = std::chrono::steady_clock::now();
      std::cout << "speed: " << (float)tsize / 1000000 << " MB/s" << std::endl;
    }
  });

  udt_eid_ = UDT::epoll_create();
  {
    int add_usock_ret = UDT::epoll_add_usock(udt_eid_, serv, &read_event);
    if (add_usock_ret < 0)
      std::cout << "UDT::epoll_add_usock error: " << add_usock_ret << std::endl;
  }
  std::cout << "Run UDT server loop ...\n";
  int udt_running_ = 1;
  while (udt_running_) {
    std::set<UDTSOCKET> readfds;
    std::set<UDTSOCKET> writefds;
    int state = UDT::epoll_wait(udt_eid_, &readfds, &writefds, -1, NULL, NULL);
    if (state > 0) {
      // read
      handleReadFds(readfds, serv);
    } else if (state == 0) {
      std::cout << "." << std::flush;
    } else {
      std::cout << "UDT epoll_wait: " << UDT::getlasterror().getErrorCode()
                << ' ' << UDT::getlasterror().getErrorMessage() << std::endl;
      if ((CUDTException::EINVPARAM == UDT::getlasterror().getErrorCode()) ||
          (CUDTException::ECONNLOST == UDT::getlasterror().getErrorCode())) {
        udt_running_ = 0;
        // UDT::epoll_remove_usock(eid, cur_sock);
      }
    }
  }

  std::cout << "release UDT epoll ..." << std::endl;
  int release_state = UDT::epoll_release(udt_eid_);
  if (release_state != 0)
    std::cout << "UDT epoll_release: " << release_state << std::endl;

  std::cout << "Close server ...";
  int close_state = UDT::close(serv);
  if (close_state != 0)
    std::cout << "UDT close:" << UDT::getlasterror().getErrorCode() << ' '
              << UDT::getlasterror().getErrorMessage() << std::endl;

  std::cout << "ok\n";
  return 0;
}

void handleReadFds(const std::set<UDTSOCKET>& readfds,
                   const UDTSOCKET& listen_sock_) {
  for (const UDTSOCKET cur_sock : readfds) {
    if (cur_sock == listen_sock_) {
      std::cout << "accept a new connection!" << std::endl;
      sockaddr addr;
      int addr_len;
      UDTSOCKET new_sock = UDT::accept(listen_sock_, &addr, &addr_len);
      if (new_sock == UDT::INVALID_SOCK) {
        std::cout << "UDT accept:" << UDT::getlasterror().getErrorCode() << ' '
                  << UDT::getlasterror().getErrorMessage() << std::endl;
        continue;
      } else {
        char clienthost[NI_MAXHOST];
        char clientservice[NI_MAXSERV];
        getnameinfo((sockaddr*)&addr, addr_len, clienthost, sizeof(clienthost),
                    clientservice, sizeof(clientservice),
                    NI_NUMERICHOST | NI_NUMERICSERV);
        cout << "new connection: " << clienthost << ":" << clientservice
             << endl;
      }

      // add to readfds
      {
        int add_usock_ret =
            UDT::epoll_add_usock(udt_eid_, new_sock, &read_event);
        if (add_usock_ret < 0)
          std::cout << "UDT::epoll_add_usock new_sock add error: "
                    << add_usock_ret << std::endl;
      }

      continue;
    } else {
      do {
        recvdata(cur_sock);
      } while (false);
    }
  }
}

#ifndef WIN32
void* recvdata(const UDTSOCKET recver)
#else
DWORD WINAPI recvdata(LPVOID usocket)
#endif
{
  char* data;
  int size = 100000;
  data = new char[size];

  //   while (true) {
  int rsize = 0;
  int rs;
  //     while (rsize < size) {
  //       int rcv_size;
  //       int var_size = sizeof(int);
  //       UDT::getsockopt(recver, 0, UDT_RCVDATA, &rcv_size, &var_size);
  if (UDT::ERROR == (rs = UDT::recv(recver, data + rsize, size - rsize, 0))) {
    cout << "recv:" << UDT::getlasterror().getErrorMessage() << endl;
    // break;
  }
  tsize += rs;

  //       rsize += rs;
  //     }
  //   }

  delete[] data;

  //   UDT::close(recver);

#ifndef WIN32
  return NULL;
#else
  return 0;
#endif
}
